Service Mesh Patterns
The Surprising Part: One Slow Service Kills Everything - Here Is the Timing Proof
# cascade_failure_demo.py
# Watch a single slow downstream service saturate an upstream worker pool
import asyncio
import time
import httpx
# Scenario: Upload Service calls Classification Service
# Classification Service has a memory leak and is responding in 15s instead of 200ms
async def classify_document_naive(doc_id: str, client: httpx.AsyncClient) -> dict:
"""No timeout. No circuit breaker. Naive implementation."""
response = await client.post(
"http://classification-service/classify",
json={"doc_id": doc_id, "text": "sample text"},
# No timeout set - default is None (wait forever)
)
return response.json()
async def simulate_upload_requests(num_concurrent: int = 50):
"""
Simulate 50 concurrent upload requests.
Each calls the slow Classification Service.
"""
async with httpx.AsyncClient() as client:
start = time.perf_counter()
tasks = [
asyncio.create_task(classify_document_naive(f"doc-{i:04d}", client))
for i in range(num_concurrent)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.perf_counter() - start
successes = sum(1 for r in results if not isinstance(r, Exception))
errors = num_concurrent - successes
print(f"Completed in {elapsed:.1f}s")
print(f"Successes: {successes}, Errors: {errors}")
print(f"During these {elapsed:.0f} seconds, Upload Service was BLOCKED")
print("Health check would also block - Kubernetes kills the pod")
print("All 50 connections held open - connection pool exhausted")
print("New requests fail immediately - pool full")
# WITHOUT any resilience patterns:
# - 50 tasks all start simultaneously
# - Classification Service responds in 15s
# - For 15 seconds, 50 asyncio coroutines are suspended waiting
# - No new requests to Upload Service can be processed (event loop saturated)
# - Kubernetes liveness probe times out → pod restarted
# - On restart, 50 new requests start → same thing happens
# - DEATH SPIRAL: pod keeps restarting, never recovers
# asyncio.run(simulate_upload_requests(50))
The cascade failure is not theoretical. It is the most common production incident pattern in microservice architectures. This lesson gives you the Python code to prevent every variant of it.
What You Will Learn
- Circuit Breaker - states, thresholds, implementing from scratch, using
tenacity - Retry with Exponential Backoff and Jitter - why naive retry causes thundering herd
- Bulkhead - isolating connection pools with
asyncio.Semaphore - Timeout - connect vs read vs pool timeouts, deadline vs timeout difference
- Service Discovery - DNS-based (Kubernetes), client-side (Consul)
- OpenTelemetry - distributed tracing across services, metrics, structured logging
- Health Checks - liveness vs readiness, checking all dependencies
Prerequisites: Python async/await, FastAPI basics, Docker.
Part 1: The Cascade Failure Anatomy
Without resilience patterns:
Upload Service Classification Service
───────────────── ──────────────────────
Request 1 ──────────────────► [Slow: 15 seconds]
Request 2 ──────────────────► [Slow: 15 seconds]
... ...
Request 50 ─────────────────► [Slow: 15 seconds]
t=0s: 50 requests begin. Upload Service's asyncio event loop is healthy.
t=1s: All 50 awaiting responses. No new requests can start - pool full.
t=5s: Kubernetes liveness probe times out (5s threshold). Pod killed.
t=6s: New pod starts. 50 NEW requests begin immediately.
t=6s: Classification Service still slow. Same thing happens.
t=11s: Pod killed again. Death spiral established.
Health effect: Upload Service is completely unavailable even though
Classification Service is just SLOW, not DOWN.
Part 2: Circuit Breaker
The Three States
┌─────────────────────────────────────────┐
│ │
│ ┌─────────┐ │
Request │ │ │ failure rate > threshold │
success │ │ CLOSED │──────────────────────────►│
(normal │ │ │ │
path) │ └────┬────┘ ┌──────────┐ │
│ │ │ │ │
│ │◄──────────────│ OPEN │ │
│ │ test request │ │ │
│ │ succeeds └────┬─────┘ │
│ ┌────▼────┐ │ │
│ │ │ │ timeout │
│ │HALF-OPEN│◄─────────────┘ │
│ │ │ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────┘
CLOSED: Normal operation. Requests pass through. Tracking failure rate.
OPEN: Failure threshold exceeded. All requests fail immediately (fail fast).
No requests reach the downstream service - it gets time to recover.
HALF-OPEN: After a timeout, allow ONE test request through.
If it succeeds → CLOSED. If it fails → OPEN again.
Circuit Breaker Implementation from Scratch
# resilience/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Awaitable, TypeVar, Any
from collections import deque
import logging
logger = logging.getLogger("resilience.circuit_breaker")
class CircuitState(str, Enum):
CLOSED = "closed" # Normal - requests pass through
OPEN = "open" # Tripped - requests fail immediately
HALF_OPEN = "half_open" # Testing - one request allowed through
class CircuitBreakerOpenError(Exception):
"""Raised when a request is rejected because the circuit is open."""
def __init__(self, service_name: str, seconds_until_retry: float):
self.service_name = service_name
self.seconds_until_retry = seconds_until_retry
super().__init__(
f"Circuit breaker for '{service_name}' is OPEN. "
f"Retry in {seconds_until_retry:.1f}s"
)
T = TypeVar("T")
class CircuitBreaker:
"""
Thread-safe asyncio circuit breaker.
Tracks a sliding window of recent calls.
"""
def __init__(
self,
service_name: str,
failure_threshold: int = 5, # Open after this many failures...
failure_window_seconds: float = 60.0, # ...within this window
success_threshold: int = 2, # Close after this many successes in HALF_OPEN
timeout_seconds: float = 30.0, # Stay OPEN for this long before trying HALF_OPEN
):
self.service_name = service_name
self._failure_threshold = failure_threshold
self._failure_window = failure_window_seconds
self._success_threshold = success_threshold
self._timeout = timeout_seconds
self._state = CircuitState.CLOSED
self._failure_times: deque[float] = deque() # Timestamps of recent failures
self._last_opened_at: float | None = None
self._consecutive_successes = 0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
return self._state
def _recent_failures(self) -> int:
"""Count failures within the sliding window."""
cutoff = time.monotonic() - self._failure_window
# Remove old failures outside the window
while self._failure_times and self._failure_times[0] < cutoff:
self._failure_times.popleft()
return len(self._failure_times)
async def call(self, func: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
"""Execute a function through the circuit breaker."""
async with self._lock:
state = self._get_current_state()
if state == CircuitState.OPEN:
seconds_until_retry = (
self._last_opened_at + self._timeout - time.monotonic()
)
raise CircuitBreakerOpenError(self.service_name, max(0, seconds_until_retry))
if state == CircuitState.HALF_OPEN:
logger.info(f"Circuit breaker '{self.service_name}': HALF_OPEN - sending test request")
# Execute the function (outside the lock to allow concurrency)
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as exc:
await self._on_failure(exc)
raise
def _get_current_state(self) -> CircuitState:
"""Compute current state, potentially transitioning OPEN → HALF_OPEN."""
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_opened_at >= self._timeout:
self._state = CircuitState.HALF_OPEN
self._consecutive_successes = 0
logger.info(f"Circuit breaker '{self.service_name}': OPEN → HALF_OPEN")
return self._state
async def _on_success(self) -> None:
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._consecutive_successes += 1
if self._consecutive_successes >= self._success_threshold:
self._state = CircuitState.CLOSED
self._failure_times.clear()
logger.info(f"Circuit breaker '{self.service_name}': HALF_OPEN → CLOSED (recovered)")
elif self._state == CircuitState.CLOSED:
self._consecutive_successes += 1
async def _on_failure(self, exc: Exception) -> None:
async with self._lock:
self._failure_times.append(time.monotonic())
self._consecutive_successes = 0
if self._state == CircuitState.HALF_OPEN:
# Test request failed - go back to OPEN
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
logger.warning(
f"Circuit breaker '{self.service_name}': HALF_OPEN → OPEN (test failed: {exc})"
)
elif self._state == CircuitState.CLOSED:
recent = self._recent_failures()
if recent >= self._failure_threshold:
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
logger.error(
f"Circuit breaker '{self.service_name}': CLOSED → OPEN "
f"({recent} failures in {self._failure_window}s window)"
)
def get_metrics(self) -> dict:
return {
"service": self.service_name,
"state": self._state.value,
"recent_failures": self._recent_failures(),
"failure_threshold": self._failure_threshold,
}
# Usage in the Classification Client
class ResilientClassifierClient:
def __init__(self, host: str, port: int):
self._client = ClassifierGRPCClient(host, port)
self._circuit_breaker = CircuitBreaker(
service_name="classification-service",
failure_threshold=5,
failure_window_seconds=60,
timeout_seconds=30,
)
async def classify(self, text: str, doc_id: str) -> dict:
try:
return await self._circuit_breaker.call(
self._client.classify, text=text, document_id=doc_id
)
except CircuitBreakerOpenError as exc:
# Service unavailable - return a degraded response or raise
logger.warning(f"Circuit open for classification: {exc}")
# Option 1: Raise to caller
raise ClassificationServiceUnavailableError(str(exc))
# Option 2: Return a default response (graceful degradation)
# return {"label": "unknown", "confidence": 0.0, "degraded": True}
Using tenacity for Production Circuit Breaking
For production, use the tenacity library which handles retry and circuit breaker patterns with battle-tested code:
# Using tenacity for retries (not a full circuit breaker, but handles most cases)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
import logging
logger = logging.getLogger("resilience")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=0.5, max=10),
retry=retry_if_exception_type(
(ClassificationServiceUnavailableError, httpx.ConnectError, httpx.TimeoutException)
),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def classify_with_retry(text: str, doc_id: str, client: ResilientClassifierClient) -> dict:
return await client.classify(text, doc_id)
Part 3: Retry with Exponential Backoff and Jitter
Why Naive Retry Is Dangerous
# THE THUNDERING HERD PROBLEM
# Scenario: Classification Service has a 30-second restart.
# 1000 Upload Service workers all get connection errors simultaneously.
# NAIVE retry (no jitter):
for attempt in range(3):
try:
result = await call_classification()
break
except ConnectionError:
await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
# All 1000 workers:
# t=0s: 1000 requests fail
# t=1s: 1000 workers retry simultaneously → Classification Service is overwhelmed before it recovers
# t=3s: 1000 retries fail again
# t=7s: 1000 retries fail again → service never recovers because of retry storm
# WITH JITTER - stagger retries randomly:
import random
async def retry_with_jitter(coro_factory, max_attempts=3, base_delay=1.0, max_delay=30.0):
for attempt in range(max_attempts):
try:
return await coro_factory()
except (ConnectionError, TimeoutError) as exc:
if attempt == max_attempts - 1:
raise
# Exponential backoff with full jitter
cap = min(max_delay, base_delay * (2 ** attempt))
sleep_time = random.uniform(0, cap)
logger.warning(
f"Attempt {attempt + 1} failed. Retrying in {sleep_time:.2f}s. Error: {exc}"
)
await asyncio.sleep(sleep_time)
Retry Strategies Compared
# 1. Fixed delay - simple, no backoff
delay = 1.0
await asyncio.sleep(delay)
# Risk: simultaneous retries (thundering herd)
# 2. Exponential backoff - delay grows: 1s, 2s, 4s, 8s...
delay = base_delay * (2 ** attempt)
await asyncio.sleep(min(delay, max_delay))
# Risk: still simultaneous if all started at same time
# 3. Exponential backoff with full jitter (recommended for distributed systems)
cap = min(max_delay, base_delay * (2 ** attempt))
delay = random.uniform(0, cap)
await asyncio.sleep(delay)
# t=0: all fail
# t=retry: each worker sleeps 0-cap seconds randomly → spread across the window
# Service recovers peacefully without retry storm
# 4. Decorrelated jitter (even better for high concurrency):
delay = min(max_delay, random.uniform(base_delay, previous_delay * 3))
await asyncio.sleep(delay)
Production Retry with tenacity
# upload_service/resilience/retry.py
from tenacity import (
AsyncRetrying,
stop_after_attempt,
wait_random_exponential,
retry_if_exception_type,
RetryError,
)
import httpx
import logging
logger = logging.getLogger("resilience.retry")
async def call_with_retry(
func,
*args,
max_attempts: int = 3,
min_wait: float = 0.5,
max_wait: float = 10.0,
**kwargs,
):
"""
Retry a coroutine with random exponential backoff.
wait_random_exponential: waits uniformly between 2^x * multiplier seconds,
where x starts at 0 and increases by 1 each retry, capped at max.
"""
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(max_attempts),
wait=wait_random_exponential(min=min_wait, max=max_wait),
retry=retry_if_exception_type((
httpx.ConnectError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ClassificationServiceUnavailableError,
)),
before_sleep=lambda retry_state: logger.warning(
f"Retry {retry_state.attempt_number}/{max_attempts} for {func.__name__} "
f"after {retry_state.outcome.exception()}"
),
reraise=True,
):
with attempt:
return await func(*args, **kwargs)
except RetryError:
raise # All attempts exhausted
Part 4: Bulkhead Pattern
The bulkhead pattern prevents one failing downstream service from exhausting all resources, affecting calls to other services.
# resilience/bulkhead.py
import asyncio
import logging
from typing import Callable, Awaitable, TypeVar
logger = logging.getLogger("resilience.bulkhead")
T = TypeVar("T")
class Bulkhead:
"""
Limits concurrent calls to a downstream service.
Named after ship bulkheads that compartmentalise flooding - if one compartment
floods, the others are safe.
If Classification Service is slow, the bulkhead limits it to 10 concurrent
calls. The 11th caller fails fast instead of blocking indefinitely.
This protects the rest of Upload Service's capacity.
"""
def __init__(
self,
name: str,
max_concurrent: int,
max_wait_seconds: float = 5.0,
):
self.name = name
self._semaphore = asyncio.Semaphore(max_concurrent)
self._max_wait = max_wait_seconds
self._max_concurrent = max_concurrent
self._active = 0
self._rejected = 0
async def call(self, func: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
"""Execute a function within the bulkhead limit."""
try:
# Try to acquire the semaphore within the wait timeout
await asyncio.wait_for(
self._semaphore.acquire(),
timeout=self._max_wait,
)
except asyncio.TimeoutError:
self._rejected += 1
logger.warning(
f"Bulkhead '{self.name}' rejected request. "
f"Active: {self._active}/{self._max_concurrent}. "
f"Total rejected: {self._rejected}"
)
raise BulkheadFullError(
f"Bulkhead '{self.name}' is full ({self._max_concurrent} concurrent calls). "
f"Try again later."
)
self._active += 1
try:
return await func(*args, **kwargs)
finally:
self._active -= 1
self._semaphore.release()
def metrics(self) -> dict:
return {
"name": self.name,
"active": self._active,
"capacity": self._max_concurrent,
"total_rejected": self._rejected,
}
class BulkheadFullError(Exception):
pass
# SEPARATE bulkheads for each downstream service
# Classification Service being slow does NOT affect calls to Storage Service
class ResilientUploadOrchestrator:
def __init__(self):
# Each downstream service gets its own isolated pool
self._classification_bulkhead = Bulkhead(
name="classification-service",
max_concurrent=10, # Max 10 concurrent classification calls
max_wait_seconds=5.0,
)
self._storage_bulkhead = Bulkhead(
name="storage-service",
max_concurrent=50, # Storage is fast, allow more concurrency
max_wait_seconds=2.0,
)
self._notification_bulkhead = Bulkhead(
name="notification-service",
max_concurrent=5, # Notifications are non-critical
max_wait_seconds=1.0,
)
async def process_document(self, doc_id: str, file_bytes: bytes, text: str) -> dict:
# Storage is isolated - even if classification is at capacity, storage still works
storage_key = await self._storage_bulkhead.call(
storage.put, f"raw/{doc_id}", file_bytes
)
try:
label = await self._classification_bulkhead.call(
classifier.classify, text, doc_id
)
except BulkheadFullError:
# Degrade gracefully: queue for later classification instead of failing
await queue.push({"doc_id": doc_id, "text": text, "retry_after": time.time() + 60})
label = "pending"
# Notification is independent - failure here doesn't affect the response
try:
await self._notification_bulkhead.call(
notifier.notify, doc_id, label
)
except BulkheadFullError:
logger.warning(f"Notification skipped for doc {doc_id} - bulkhead full")
return {"doc_id": doc_id, "storage_key": storage_key, "label": label}
Part 5: Timeout Patterns
The Four Timeout Types
# httpx timeout configuration - four separate timeout values
import httpx
timeout = httpx.Timeout(
connect=2.0, # Max time to establish a TCP connection
read=30.0, # Max time to wait for the server to start sending a response
write=10.0, # Max time to send the request body
pool=5.0, # Max time to wait for a connection from the connection pool
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post("http://classification-service/classify", json={...})
# Why each matters:
# connect: catches network routing issues, DNS failures
# read: catches slow server responses, hanging handlers
# write: catches slow uploads to the server
# pool: catches connection pool exhaustion (too many concurrent requests)
Timeout vs Deadline
# TIMEOUT: "give this single call N seconds"
# DEADLINE: "the entire operation (possibly spanning multiple retries) must finish by time T"
import time
async def classify_with_deadline(text: str, deadline: float) -> dict:
"""
Deadline-based timeout: each retry gets whatever time remains.
This prevents retry storms from extending total time beyond the SLA.
"""
for attempt in range(3):
time_remaining = deadline - time.monotonic()
if time_remaining <= 0:
raise TimeoutError("Deadline exceeded before classification could complete")
try:
# Each attempt gets proportional time from the remaining deadline
per_attempt_timeout = min(time_remaining, 10.0)
return await asyncio.wait_for(
classifier.classify(text),
timeout=per_attempt_timeout,
)
except asyncio.TimeoutError:
if attempt < 2:
backoff = random.uniform(0, min(2 ** attempt, time_remaining / 2))
await asyncio.sleep(backoff)
else:
raise
# Usage:
deadline = time.monotonic() + 30.0 # Overall SLA: 30 seconds total
result = await classify_with_deadline(text, deadline=deadline)
# asyncio.wait_for vs httpx timeout:
# asyncio.wait_for: cancels any coroutine after timeout (general purpose)
# httpx timeout: granular (connect/read/write/pool) - better for HTTP calls
async def classify_with_asyncio_timeout(text: str) -> dict:
try:
return await asyncio.wait_for(
classifier.classify(text),
timeout=5.0, # 5 seconds total for this call
)
except asyncio.TimeoutError:
raise ClassificationServiceUnavailableError("Classification timed out after 5s")
Part 6: Service Discovery
DNS-Based Service Discovery in Kubernetes
In Kubernetes, every Service gets a DNS name automatically. No Consul or custom discovery needed.
# In Kubernetes:
# Service name: classification-service
# Namespace: doc-intelligence
# DNS: classification-service.doc-intelligence.svc.cluster.local
# From Upload Service pod (same namespace):
CLASSIFICATION_SERVICE_URL = "http://classification-service:50051"
# Kubernetes DNS resolves this to the Service ClusterIP
# Service load-balances across all healthy pods automatically
# configuration/kubernetes_config.py
import os
from pydantic_settings import BaseSettings
class ServiceConfig(BaseSettings):
# These env vars are set automatically by Kubernetes Service injection
# or manually in Deployment manifests
classification_service_host: str = os.getenv(
"CLASSIFICATION_SERVICE_HOST",
"classification-service", # K8s DNS name
)
classification_service_port: int = int(os.getenv(
"CLASSIFICATION_SERVICE_PORT",
"50051",
))
processing_service_host: str = os.getenv(
"PROCESSING_SERVICE_HOST",
"processing-service",
)
kafka_bootstrap_servers: str = os.getenv(
"KAFKA_BOOTSTRAP_SERVERS",
"kafka:9092",
)
redis_url: str = os.getenv(
"REDIS_URL",
"redis://redis:6379/0",
)
config = ServiceConfig()
# Kubernetes manifest for Upload Service deployment
UPLOAD_DEPLOYMENT_YAML = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: upload-service
namespace: doc-intelligence
spec:
replicas: 2
selector:
matchLabels:
app: upload-service
template:
spec:
containers:
- name: upload-service
image: doc-intelligence/upload-service:1.0.0
env:
- name: CLASSIFICATION_SERVICE_HOST
value: "classification-service"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 10
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /readiness
port: 8001
initialDelaySeconds: 15
periodSeconds: 10
failureThreshold: 3
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
"""
Part 7: Observability with OpenTelemetry
Why OpenTelemetry?
Before OpenTelemetry, you had to choose a vendor (Datadog, New Relic, Jaeger) and use their SDK. Switching vendors required rewriting instrumentation. OpenTelemetry is the vendor-neutral standard - instrument once, export anywhere.
OpenTelemetry SDK
│
├── OTLP Exporter → Jaeger (open source)
├── OTLP Exporter → Tempo (Grafana)
├── OTLP Exporter → Datadog
└── OTLP Exporter → New Relic
You change only the exporter configuration, not the instrumentation code.
Setting Up OpenTelemetry in FastAPI
# upload_service/observability.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
def setup_observability(service_name: str, otlp_endpoint: str = "http://jaeger:4317"):
"""
Configure OpenTelemetry tracing and metrics.
Call once at application startup.
"""
# Define the service resource (appears in trace UI)
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production",
})
# ── Tracing ──────────────────────────────────────────────────────
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True),
max_queue_size=2048,
max_export_batch_size=512,
export_timeout_millis=30000,
)
)
trace.set_tracer_provider(tracer_provider)
# ── Metrics ──────────────────────────────────────────────────────
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=otlp_endpoint, insecure=True),
export_interval_millis=10000, # Export every 10 seconds
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# ── Auto-instrumentation ─────────────────────────────────────────
# These libraries automatically create spans for all HTTP requests/SQL queries
FastAPIInstrumentor().instrument()
HTTPXClientInstrumentor().instrument() # Instruments all httpx.AsyncClient calls
SQLAlchemyInstrumentor().instrument() # Instruments all SQLAlchemy queries
return tracer_provider, meter_provider
Custom Spans and Metrics
# upload_service/routes/upload.py
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
# Get tracer and meter for this module
tracer = trace.get_tracer("upload_service.routes")
meter = metrics.get_meter("upload_service")
# Custom metrics
upload_counter = meter.create_counter(
"documents.uploaded",
description="Number of documents uploaded",
unit="documents",
)
classification_duration = meter.create_histogram(
"classification.duration",
description="Time taken to classify a document",
unit="ms",
)
upload_size = meter.create_histogram(
"documents.upload_size_bytes",
description="Size of uploaded documents",
unit="bytes",
)
@router.post("/documents/upload")
async def upload_document(
file: UploadFile,
current_user: dict = Depends(get_current_user),
classifier: ResilientClassifierClient = Depends(get_classifier),
db: AsyncSession = Depends(get_db_session),
):
# Custom span - adds detail beyond the auto-instrumented span
with tracer.start_as_current_span(
"document.upload",
attributes={
"user.id": current_user["id"],
"document.filename": file.filename,
"document.content_type": file.content_type or "unknown",
},
) as span:
file_bytes = await file.read()
file_size = len(file_bytes)
span.set_attribute("document.size_bytes", file_size)
upload_size.record(file_size, {"content_type": file.content_type or "unknown"})
# Store file - nested span for the storage operation
with tracer.start_as_current_span("storage.put") as storage_span:
doc_id = generate_doc_id()
storage_key = await storage.put(f"raw/{doc_id}", file_bytes)
storage_span.set_attribute("storage.key", storage_key)
# Classify - nested span with timing
import time
classify_start = time.perf_counter()
with tracer.start_as_current_span("classification.classify") as class_span:
try:
classification = await classifier.classify(
text=await extract_text_preview(file_bytes),
doc_id=doc_id,
)
label = classification["results"][0]["label"] if classification["results"] else "unknown"
class_span.set_attribute("classification.label", label)
class_span.set_attribute("classification.model_version", classification["model_version"])
except ClassificationServiceUnavailableError as exc:
class_span.set_status(Status(StatusCode.ERROR, str(exc)))
class_span.record_exception(exc)
label = "pending" # Degrade gracefully
classify_ms = (time.perf_counter() - classify_start) * 1000
classification_duration.record(classify_ms, {"label": label})
# Record successful upload
upload_counter.add(1, {
"user.tier": current_user.get("tier", "free"),
"content_type": file.content_type or "unknown",
"classification": label,
})
span.set_attribute("classification.label", label)
return {"doc_id": doc_id, "storage_key": storage_key, "label": label}
Distributed Trace Across Two Services
With auto-instrumentation, when Upload Service calls Classification Service via HTTP, the trace context is automatically propagated via HTTP headers (traceparent). Both services' spans appear in the same trace in Jaeger.
Jaeger UI - Trace View:
Trace ID: abc123...
Duration: 320ms
├── upload-service: POST /documents/upload [0ms → 320ms]
│ ├── upload-service: storage.put [10ms → 25ms]
│ ├── upload-service: HTTP POST /classify [30ms → 300ms] ← httpx auto-instrumented
│ │ └── classification-service: POST /classify [35ms → 295ms] ← propagated trace context
│ │ └── classification-service: model.infer [40ms → 280ms]
│ └── upload-service: sqlalchemy: INSERT documents [305ms → 315ms]
This is the power of distributed tracing - you see the entire request flow across services in one view, with timing and errors.
Part 8: Health Checks
# upload_service/routes/health.py
from fastapi import APIRouter, Request, status
from fastapi.responses import JSONResponse
import time
import asyncio
health_router = APIRouter()
@health_router.get("/health", include_in_schema=False)
async def liveness():
"""
Kubernetes LIVENESS probe.
Rule: Only check if this PROCESS is alive.
Do NOT check downstream dependencies.
If liveness fails → Kubernetes RESTARTS the container.
Restarting because a downstream is slow = makes cascade failure worse.
The downstream being slow is not Upload Service's fault - don't restart for it.
"""
return {"status": "alive", "timestamp": time.time()}
@health_router.get("/readiness", include_in_schema=False)
async def readiness(request: Request):
"""
Kubernetes READINESS probe.
Rule: Check if this service can HANDLE TRAFFIC.
If readiness fails → Kubernetes stops sending traffic (no restart).
Check: database, cache, connection pools.
Do NOT check: downstream services (classification, email).
Why: if classification service is down, upload service can still accept
uploads and process them later.
"""
checks = {}
healthy = True
# Check 1: Database connection pool
try:
factory = request.app.state.db_session_factory
async with factory() as db:
from sqlalchemy import text
await asyncio.wait_for(db.execute(text("SELECT 1")), timeout=2.0)
checks["database"] = {"status": "ok"}
except Exception as exc:
checks["database"] = {"status": "error", "detail": str(exc)[:200]}
healthy = False
# Check 2: Redis
try:
redis = request.app.state.redis_client
await asyncio.wait_for(redis.ping(), timeout=1.0)
checks["redis"] = {"status": "ok"}
except Exception as exc:
checks["redis"] = {"status": "error", "detail": str(exc)[:200]}
healthy = False
# Check 3: Connection pool health (not downstream service)
try:
http_client = request.app.state.http_client
pool_info = {
"connections": len(http_client._transport._pool._connections),
}
checks["http_pool"] = {"status": "ok", **pool_info}
except Exception as exc:
checks["http_pool"] = {"status": "error", "detail": str(exc)[:200]}
# Don't mark unhealthy - pool info is nice-to-have
return JSONResponse(
status_code=status.HTTP_200_OK if healthy else status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"status": "ready" if healthy else "not_ready",
"checks": checks,
"timestamp": time.time(),
"circuit_breakers": {
"classification": request.app.state.classifier_client._circuit_breaker.get_metrics(),
},
"bulkheads": {
"classification": request.app.state.orchestrator._classification_bulkhead.metrics(),
"storage": request.app.state.orchestrator._storage_bulkhead.metrics(),
},
},
)
@health_router.get("/metrics", include_in_schema=False)
async def prometheus_metrics():
"""
Prometheus text format metrics endpoint.
Used by Prometheus scraping - alternative to OTLP push.
"""
from opentelemetry.exporter.prometheus import PrometheusMetricReader
# In practice, use prometheus-fastapi-instrumentator library
# which exposes /metrics automatically
return {"note": "Use /metrics endpoint from prometheus-fastapi-instrumentator"}
Putting It All Together: The Resilient Client
# upload_service/clients/resilient_classifier.py
# Production-ready client combining all patterns
class ProductionClassifierClient:
"""
Combines: Circuit Breaker + Bulkhead + Retry + Timeout + Tracing
"""
def __init__(self, host: str, port: int):
self._grpc_client = ClassifierGRPCClient(host, port)
self._circuit_breaker = CircuitBreaker(
service_name="classification-service",
failure_threshold=5,
failure_window_seconds=60,
timeout_seconds=30,
)
self._bulkhead = Bulkhead(
name="classification-service",
max_concurrent=10,
max_wait_seconds=5.0,
)
self._tracer = trace.get_tracer("upload_service.classifier_client")
async def classify(self, text: str, doc_id: str) -> dict:
with self._tracer.start_as_current_span(
"classifier.classify",
attributes={"doc.id": doc_id, "text.length": len(text)},
) as span:
try:
# Layer 1: Bulkhead (reject immediately if at capacity)
result = await self._bulkhead.call(
self._classify_with_circuit_breaker, text, doc_id, span
)
span.set_attribute("result.label", result.get("label", "unknown"))
return result
except BulkheadFullError as exc:
span.set_status(Status(StatusCode.ERROR, "Bulkhead full"))
raise ClassificationServiceUnavailableError(f"Classification busy: {exc}")
except CircuitBreakerOpenError as exc:
span.set_attribute("circuit_breaker.state", "open")
span.set_status(Status(StatusCode.ERROR, "Circuit open"))
raise ClassificationServiceUnavailableError(f"Circuit open: {exc}")
async def _classify_with_circuit_breaker(self, text: str, doc_id: str, span) -> dict:
# Layer 2: Circuit breaker (fail fast if service known to be down)
return await self._circuit_breaker.call(
self._classify_with_retry, text, doc_id, span
)
async def _classify_with_retry(self, text: str, doc_id: str, span) -> dict:
# Layer 3: Retry with jitter (handle transient failures)
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_random_exponential(min=0.5, max=10),
retry=retry_if_exception_type(ClassificationServiceUnavailableError),
reraise=True,
):
with attempt:
# Layer 4: Per-call timeout
return await asyncio.wait_for(
self._grpc_client.classify(text=text, document_id=doc_id),
timeout=10.0,
)
Interview Patterns
Q: Describe the three states of a circuit breaker and when each transition occurs.
A: CLOSED is the normal operating state - requests pass through and failures are counted. When the failure rate exceeds a threshold (e.g., 5 failures within 60 seconds), the circuit transitions to OPEN. In OPEN state, all requests fail immediately with a CircuitBreakerOpenError without touching the downstream service, giving it time to recover. After a configured timeout (e.g., 30 seconds), the circuit transitions to HALF_OPEN, allowing one test request through. If the test request succeeds, the circuit returns to CLOSED. If it fails, it returns to OPEN.
Q: Why is naive exponential backoff (without jitter) dangerous in a microservice architecture?
A: If 1,000 service instances all encounter the same failure simultaneously and retry with identical exponential backoff schedules, they all retry at exactly the same moments (t+1s, t+2s, t+4s). This creates a "thundering herd" - the recovering downstream service receives 1,000 simultaneous requests at each retry interval, potentially crashing again. Adding jitter (random delay from 0 to the exponential cap) staggers retries across the window, reducing the maximum concurrent load on the recovering service to a fraction of the total.
Q: What is the difference between a timeout and a deadline?
A: A timeout is scoped to a single operation ("wait at most 10 seconds for this call"). A deadline is scoped to an entire workflow ("this entire chain of operations must complete by timestamp T"). The difference matters when retrying: with per-call timeouts and retries, total elapsed time is timeout × max_retries, which can greatly exceed the user's SLA. With a deadline, each retry gets only the remaining time from the deadline budget, ensuring the total wall-clock time stays bounded.
Q: What is the bulkhead pattern, and what problem does it prevent?
A: The bulkhead pattern allocates a fixed pool of concurrent connections (typically via asyncio.Semaphore) to each downstream service. If the Classification Service is slow and its bulkhead of 10 concurrent slots fills up, the 11th caller fails fast with a BulkheadFullError. Without bulkheads, a slow downstream can hold all of the caller's connection pool, blocking calls to other services that are healthy. Bulkheads isolate the blast radius - one slow service affects only its own pool.
Q: In OpenTelemetry, how does a trace span from Upload Service appear alongside a span from Classification Service in Jaeger?
A: When HTTPXClientInstrumentor is active, each outbound HTTP request from Upload Service includes a traceparent header containing the current trace ID and span ID. When Classification Service (also instrumented with FastAPI auto-instrumentation) receives the request, it reads this header and creates a child span using the same trace ID. Both spans share a trace ID, and Jaeger renders them in the same trace waterfall, showing the parent-child relationship and the precise timing of cross-service communication.
